package com.wichell.framework.rocketmq.client;

import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wichell.framework.rocketmq.exception.RocketMQException;
import com.wichell.framework.util.JacksonUtil;

public class RocketMQCommonProducer {
	private Logger logger = LoggerFactory.getLogger(RocketMQCommonProducer.class);
	/**
	 * Producer group conceptually aggregates all producer instances of exactly
	 * same role, which is particularly important when transactional messages
	 * are involved.
	 * </p>
	 *
	 * For non-transactional messages, it does not matter as long as it's unique
	 * per process.
	 * </p>
	 *
	 * See {@linktourl http://rocketmq.incubator.apache.org/docs/core-concept/}
	 * for more discussion.
	 */
	private String producerGroup;
	/** RocketMQ cluster address */
	private String namesrvAddr;

	/**
	 * Timeout for sending messages.
	 */
	private int sendMsgTimeout = 3000;

	/**
	 * Compress message body threshold, namely, message body larger than 4k will
	 * be compressed on default.
	 */
	private int compressMsgBodyOverHowmuch = 1024 * 4;

	/**
	 * Maximum number of retry to perform internally before claiming sending
	 * failure in synchronous mode.
	 * </p>
	 *
	 * This may potentially cause message duplication which is up to application
	 * developers to resolve.
	 */
	private int retryTimesWhenSendFailed = 2;

	/**
	 * Maximum number of retry to perform internally before claiming sending
	 * failure in asynchronous mode.
	 * </p>
	 *
	 * This may potentially cause message duplication which is up to application
	 * developers to resolve.
	 */
	private int retryTimesWhenSendAsyncFailed = 2;

	/**
	 * Maximum allowed message size in bytes.
	 */
	private int maxMessageSize = 1024 * 1024 * 4; // 4M
	protected DefaultMQProducer producer;

	// @PostConstruct
	public void init() throws RocketMQException {
		if (StringUtils.isBlank(producerGroup)) {
			throw new RuntimeException("producerGroup不能为空");
		}
		if (StringUtils.isBlank(namesrvAddr)) {
			throw new RuntimeException("namesrvAddr不能为空");
		}
		producer = new DefaultMQProducer(producerGroup);
		producer.setNamesrvAddr(namesrvAddr);
		producer.setSendMsgTimeout(sendMsgTimeout);
		producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch);
		producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
		producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);
		producer.setMaxMessageSize(maxMessageSize);
		try {
			producer.start();
		} catch (Exception e) {
			e.printStackTrace();
			throw new RocketMQException("创建消息生产者失败");
		}
	}

	public SendResult send(String topic, String[] tags, String keys, Map<String, Object> bodyMap)
			throws RocketMQException {
		SendResult sendResult = null;
		if (StringUtils.isEmpty(topic)) {
			throw new RocketMQException("消息主题不能为空");
		}
		String tagStr = "*";
		if (tags != null && tags.length > 0) {
			tagStr = String.join("||", tags);
		}
		try {
			Message message = new Message(topic, tagStr, keys, JacksonUtil.obj2Json(bodyMap).getBytes("UTF-8"));
			sendResult = producer.send(message);
			logger.info("发送消息结果：" + sendResult);
		} catch (Exception e) {
			e.printStackTrace();
			throw new RuntimeException("发送消息失败，失败原因：" + e.getMessage());
		}
		return sendResult;
	}

	// @PreDestroy
	public void shutdown() {
		if (producer != null) {
			producer.shutdown();
		}
	}

	public String getProducerGroup() {
		return producerGroup;
	}

	public void setProducerGroup(String producerGroup) {
		this.producerGroup = producerGroup;
	}

	public String getNamesrvAddr() {
		return namesrvAddr;
	}

	public void setNamesrvAddr(String namesrvAddr) {
		this.namesrvAddr = namesrvAddr;
	}

	public int getSendMsgTimeout() {
		return sendMsgTimeout;
	}

	public void setSendMsgTimeout(int sendMsgTimeout) {
		this.sendMsgTimeout = sendMsgTimeout;
	}

	public int getCompressMsgBodyOverHowmuch() {
		return compressMsgBodyOverHowmuch;
	}

	public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
		this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
	}

	public int getRetryTimesWhenSendFailed() {
		return retryTimesWhenSendFailed;
	}

	public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
		this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
	}

	public int getRetryTimesWhenSendAsyncFailed() {
		return retryTimesWhenSendAsyncFailed;
	}

	public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
		this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
	}

	public int getMaxMessageSize() {
		return maxMessageSize;
	}

	public void setMaxMessageSize(int maxMessageSize) {
		this.maxMessageSize = maxMessageSize;
	}

}
